WorkflowsでStorage Transfer Service用コネクタを用いてCloud Storageバケット間のファイル転送を行う
概要
数あるWorkflowsのコネクタの中でも今回取り上げるのはStorage Transfer Service(以下STS)のコネクタです。
サンプルをベースにSTSコネクタを用いてWorkflowsでSTSジョブを作成し、作成したSTSジョブを起動してCloud Storageバケット間のオブジェクト移動を試してみました。
以下がSTSコネクタのサンプルです。
もしWorkflowsからのSTS操作に興味があれば読んでみてください。
やってみる
STSジョブの作成
まずSTSジョブの作成をWorkflowsからSTSコネクタで行います。
ジョブの作成は以下のコネクタです。
Method: googleapis.storagetransfer.v1.transferJobs.create
Cloud Storage間のオブジェクトコピーを行うジョブの作成は以下のyamlとなります。
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- transfer_job_name: "transferJobs/testjob"
- src_bucket_name: "コピー元バケット名"
- sink_bucket_name: "コピー先バケット名"
- create_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.create
args:
body:
name: ${transfer_job_name}
description: "Test JOB"
projectId: ${project_id}
transferSpec:
gcsDataSink:
bucketName: ${sink_bucket_name}
path: ""
gcsDataSource:
bucketName: ${src_bucket_name}
path: ""
objectConditions:
includePrefixes: ["test/"]
status: "DISABLED"
設定値の説明は以下の表となります。
設定値 | 概要 |
---|---|
name | ジョブの名称。設定しない場合は一意の名前が自動で設定される。設定する場合はtransferJobs/ で始まる必要がある |
description | ジョブの説明 |
projectId | ジョブを作成するプロジェクトID |
transferSpec | 転送内容の詳細 |
gcsDataSink | コピー先バケット |
gcsDataSource | コピー元バケット |
ObjectConditions | 転送対象をフィルタするためのプレフィックスの条件 |
status | ジョブ作成時のステータス |
ObjectConditions
でincludePrefixes
を設定することで特定の接頭辞だけを選択するように指定できます。除外する場合はexcludePrefixes
を用います。
上記の実装ではincludePrefixes
にtest/
を指定していますのでtest/test1.txt
のようにtest/
がプレフィックスとなっているオブジェクトがコピー対象となります。
上記Workflowsを実行するとSTSのジョブが作成されます。
フィルタ
の接頭辞で含める
にincludePrefixes
で指定した値が設定されていることや、コピー元・コピー先バケットが指定した通りになっていることが確認できると思います。また、status
をDISABLED
で作成したのでジョブが無効になっている状態です。有効のまま作成する場合はENABLED
を指定します。
有効に設定するには以下のコネクタ(patch)を用います。
Method: googleapis.storagetransfer.v1.transferJobs.patch
このコネクタを用いてジョブのステータスを有効(ENABLED
)にします。patch
操作ではジョブに対して様々な更新ができます。ジョブの削除もこのコネクタで行います。
- enable_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.patch
args:
jobName: ${transfer_job_name}
body:
projectId: ${project_id}
transferJob:
status: "ENABLED"
ジョブを削除する
削除する場合もpatch
を用います。
Method: googleapis.storagetransfer.v1.transferJobs.patch
status
にDELETED
を設定することでジョブが削除されます。
- delete_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.patch
args:
jobName: ${transfer_job_name}
body:
projectId: ${project_id}
transferJob:
status: "DELETED"
ジョブを実行する
実行は以下のコネクタです。
Method: googleapis.storagetransfer.v1.transferJobs.run
このコネクタは実行するだけなので引数もジョブ名・プロジェクトIDのみとシンプルなものです。
- run_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.run
args:
jobName: ${transfer_job_name}
body:
projectId: ${project_id}
result: run_result
ジョブ作成から削除まで
ジョブの作成から実行までのYAMLは以下となります。
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- transfer_job_name: "transferJobs/testjob"
- src_bucket_name: "コピー元バケット名"
- sink_bucket_name: "コピー先バケット名"
- create_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.create
args:
body:
name: ${transfer_job_name}
description: "Test JOB"
projectId: ${project_id}
transferSpec:
gcsDataSink:
bucketName: ${sink_bucket_name}
path: ""
gcsDataSource:
bucketName: ${src_bucket_name}
path: ""
objectConditions:
includePrefixes: ["test/"]
status: "DISABLED"
- enable_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.patch
args:
jobName: ${transfer_job_name}
body:
projectId: ${project_id}
transferJob:
status: "ENABLED"
- run_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.run
args:
jobName: ${transfer_job_name}
body:
projectId: ${project_id}
result: run_result
ジョブを実行するとsrc_bucket_name
で指定したバケットにtest/
のプレフィックスがついているオブジェクトがsink_bucket_name
にコピーされています。
Workflowsからスケジュール実行してみる
STSのスケジューラでの実行間隔の最短は1時間ですが、Workflows(Cloud Scheduler)の実行間隔は最短1分です。
つまりWorkflowsからSTSを実行するのようにしたらSTSの実行間隔をより高頻度にできるのではないかと思い試してみました。
STSジョブを実行するだけのワークフローを作成し、1分ごとに実行するスケジュール設定をします。
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- transfer_job_name: "transferJobs/testsamplejob"
- run_transfer_job:
call: googleapis.storagetransfer.v1.transferJobs.run
args:
jobName: ${transfer_job_name}
body:
projectId: ${project_id}
result: run_result
- the_end:
return: "SUCCESS"
STSジョブの実行履歴をみてみます。
開始時間 | ステータス |
---|---|
2024年11月2日 1:18:03 UTC+9 | 成功 |
2024年11月2日 1:17:03 UTC+9 | 成功 |
2024年11月2日 1:16:06 UTC+9 | 成功 |
2024年11月2日 1:15:02 UTC+9 | 成功 |
実行結果のスクリーンショットは以下です。
上記より1分おきに実行できていることが確認できました。
STSジョブを1時間より短い間隔で定期実行したい場合、Workflowsからの実行もアリだなと思います。
まとめ
ジョブの作成や削除はWorkflowsから行うことは少ないかもですが、実行や短い間隔でのスケジュール実行などはWorkflowsから行うこともワークロードによってはありかなと思います。
ワークフローでエラーが起きたファイルをまとめて移動する処理に使うというのもありかも。
いろいろ使い道はありそうだなと思います。
それではまた。ナマステー
参考
Storage Transfer Service 用コネクタ
Storage Transfer Service API Connector Overview